Typed Actor System

Introduction

An actor model in concurrent programming is a conceptual model for handling concurrent computation, the core concepts of an Actor model are as follows

  • Processes messages asynchronously
  • Has it's own state which is not shared
  • Can do one of the three things when it receives a message
    • Send message to other actors
    • Create new actors
    • Modify internal state and behaviour

Creating the project

we can start out the Rust project with the following command ( for now we simply name it as tyactor , stands for typed-actor )

cargo new --lib tyactor

but this only creates a project with a library file. we need some way to run it as well. so let's add a binary file to it. Let this be inside the folder app

mkdir app
touch app/main.rs

now change the Cargo.toml accordingly so that we can expose the library as well as run an example using the library functions with cargo command itself.

[lib]
name = "tyactor"
path = "src/lib.rs"

[[bin]]
name = "example"
path = "app/main.rs"

now to run the code inside app/main.rs we only need to run the command

cargo run example

Using Multi Producer Single Consumer Channel

consider the following code for creating a mpsc::channel

use tokio::sync::{mpsc, oneshot};

#[tokio::main]
async fn main() {
    // mpsc::channel retursn a Sender (tx) and a mutable Receiver (mut rx)
    let (tx1, mut rx) = mpsc::channel(32);

    // we clone the Sender to simulate the idea of sending data from two
    // different sources, both of these senders will be pointing to the
    // same receiver
    let tx2 = tx1.clone();

    // spawns an async tokio task that sends the data to the receiver.
    // this is an async operation as the `tx1.send()` returns a future
    // on which we `await` to see if the value was sent correctly, we may
    // have to wait here if the receiver queue is full. the `move` keywords
    // here means that the value `tx1` is moved into this context and is
    // dropped as soon as the block finishes executing. ie, when the data
    // is sent
    tokio::spawn(async move {
        if let Err(_) = tx1.send(3).await {
            println!("the receiver dropped");
        }
    });

    tokio::spawn(async move {
        if let Err(_) = tx2.send(4).await {
            println!("the receiver dropped");
        }
    });

    // the Receiver will return a `Some(value)` as long as there's a Sender
    // active. and in this case when both the Senders are dropped the
    // `rx.recv().await` returns None. otherwise it polls the next message
    // from the queue and we can process the message
    while let Some(msg) = rx.recv().await {
        println!("Got {:?}", msg);
    }
}

Basic Actor framework using mpsc::Channel

let's use the previous ideas to write two structs for Actor which is responsible for handling messages sent to it and an ActorHandle which can be used to send messages to the Actor . This code is inspired by Actors with Tokio

use std::marker::Send;
use tokio::sync::mpsc;

/// The Actor struct, responsible for spawning the actor that receive the
/// messages and then handle them, the actor itself may have a state that can
/// be affected by the message
pub struct Actor<M: Send + 'static, S: Default + Send> {
    // a handle to the receiver from mpsc::channel so that we can use it to
    // receive messages
    receiver: mpsc::Receiver<M>,
    // the state of the actor that can be modified by the handle function
    state: S,
    // the state is borrowed mutably so that the function may modify it
    handle: fn(&mut S, msg: M) -> (),
}

impl<M: Send + 'static, S: Default + Send> Actor<M, S> {
    pub fn new(tx: mpsc::Receiver<M>, f: fn(&mut S, M) -> ()) -> Self {
        Actor {
            receiver: tx,
            state: S::default(),
            handle: f,
        }
    }
    pub async fn start(mut self) {
        while let Some(msg) = self.receiver.recv().await {
            (self.handle)(&mut self.state, msg)
        }
    }
}

/// ActorHandle can be used to send messages to the respective actor.
#[derive(Clone)]
pub struct ActorHandle<M: Send + 'static> {
    // holds a handle to the sender from mpsc::channel
    id: mpsc::Sender<M>,
}

impl<M: Send> ActorHandle<M> {
    // when we create a new actor what we only return is the handle, during
    // it's creation we launch the actor and create a handle to it as well.
    pub fn new<S: Default + Send + 'static>(size: usize, f: fn(&mut S, M) -> ()) -> Self {
        let (tx, rx): (mpsc::Sender<M>, mpsc::Receiver<M>) = mpsc::channel(size);
        let actor: Actor<M, S> = Actor::new(rx, f);
        tokio::spawn(async move {
            let _ = actor.start().await;
        });
        let handle = ActorHandle { id: tx };
        return handle;
    }
    pub async fn send(self, msg: M) -> () {
        if let Err(e) = self.id.send(msg).await {
            eprintln!("{:?}", e);
        }
    }
}

Introducing traits

let's capture the idea of an Actor through a trait to make this process more straightforward

pub trait ActorTrait {
    type State : Default + Send + 'static;
    type Message : Send + 'static;
    fn handle(state: &mut Self::State, msg: Self::Message) -> ();
}

changing the ActorHandle implementation into

    pub fn new<A : ActorTrait<Message = M>>(size: usize) -> Self {
        let (tx, rx): (mpsc::Sender<M>, mpsc::Receiver<M>) 
            = mpsc::channel(size);
        let actor: Actor<M, <A as ActorTrait>::State> 
            = Actor::new(rx, <A as ActorTrait>::handle);
        tokio::spawn(async move {
            let _ = actor.start().await;
        });
        let handle = ActorHandle { id: tx };
        return handle;
    }

Thanks

This page is still Work in Progress
checkout more of my works at :
🏠 home